package com.flink_demo.demo.eventtime;

import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Properties;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.collector.selector.OutputSelector;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.source.FileMonitoringFunction;
import org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.ContinuousEventTimeTrigger;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;

public class KafkaEventTime {
	public static final String ZOOKEEPER_HOST = "192.168.2.200:2181";
	public static final String KAFKA_BROKER = "192.168.2.200:9092";
	public static final String TRANSACTION_GROUP = "kafka_demo";

	public static void main(String... args) throws Exception {
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//		FileMonitoringFunction
		
		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
		env.getConfig().setAutoWatermarkInterval(5000l);
		env.enableCheckpointing(500);
		env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
		env.getConfig().setGlobalJobParameters(ParameterTool.fromArgs(args));
		// configure Kafka consumer
		Properties kafkaProps = new Properties();
		kafkaProps.setProperty("zookeeper.connect", ZOOKEEPER_HOST);
		kafkaProps.setProperty("bootstrap.servers", KAFKA_BROKER);
		kafkaProps.setProperty("group.id", TRANSACTION_GROUP);
//		WikipediaEditsSource WikipediaEditsSource = new WikipediaEditsSource
		// topicd的名字是event_time，schema默认使用SimpleStringSchema()即可
		DataStream<Tuple3<String, Long, String>> kafkaStream = env
				.addSource(new FlinkKafkaConsumer010<String>("event_time", new SimpleStringSchema(), kafkaProps))
				.map(new MapFunction<String, Tuple3<String, Long, String>>() {

					/**
					 * 拆分split
					 */
					private static final long serialVersionUID = 1L;

					@Override
					public Tuple3<String, Long, String> map(String value) throws Exception {
//						TwoInputTransformation
						String val[] = value.split(",", -1);
						return new Tuple3<String, Long, String>(val[0], Long.valueOf(val[1]), val[2]);
					}
				});
//		kafkaStream
//		kafkaStream.flatMap(flatMapper)
//		kafkaStream.split(new OutputSelector<Tuple3<String,Long,String>>() {
//			
//			@Override
//			public Iterable<String> select(Tuple3<String, Long, String> value) {
//				// TODO Auto-generated method stub
//				return null;
//			}
//		}).select(outputNames);
		
		kafkaStream.assignTimestampsAndWatermarks(new MyTimestampAndWatermarkAssigner())
		.disableChaining()
		.keyBy(1)
		.window(TumblingEventTimeWindows.of(Time.seconds(8)))
//		.trigger(new Trigger<Tuple3<String, Long, String>, Window>() {
//			private static final long serialVersionUID = 1L;
//			@Override
//			public TriggerResult onElement(Tuple3<String, Long, String> element, long timestamp, Window window,
//					TriggerContext ctx) throws Exception {
//				// TODO Auto-generated method stub
//				System.out.println("onElement");
//				return TriggerResult.CONTINUE;
//			}
//
//			@Override
//			public TriggerResult onProcessingTime(long time, Window window, TriggerContext ctx)
//					throws Exception {
//				// TODO Auto-generated method stub
//				System.out.println("onProcessingTime");
//				return TriggerResult.CONTINUE;
//			}
//
//			@Override
//			public TriggerResult onEventTime(long time, Window window, TriggerContext ctx) throws Exception {
//				// TODO Auto-generated method stub
//				System.out.println("onEventTime");
//				return TriggerResult.FIRE_AND_PURGE;
//			}
//
//			@Override
//			public void clear(Window window, TriggerContext ctx) throws Exception {
//				// TODO Auto-generated method stub
//				System.out.println("clear");
//			}
//		})
//		EventTimeTrigger
		.allowedLateness(Time.minutes(10))
		.trigger(ContinuousEventTimeTrigger.of(Time.seconds(1)))
		.sum(1).print();
		//先keyby后生成水线
//		kafkaStream.keyBy(1).assignTimestampsAndWatermarks(new MyTimestampAndWatermarkAssigner())
//		.rescale();
//		Rescaling
		
		
//		env.execute();
		System.out.println(env.getExecutionPlan());
//		System.out.println(env.getStreamGraph().getJobGraph());
//		StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph());
//		ExecutionGraphBuilder.buildGraph(prior, jobGraph, jobManagerConfig, futureExecutor, ioExecutor, slotProvider, classLoader, recoveryFactory, timeout, restartStrategy, metrics, parallelismForAutoMax, log)
	}

	public static class MyTimestampAndWatermarkAssigner
			implements AssignerWithPeriodicWatermarks<Tuple3<String, Long, String>> {
		private static final long serialVersionUID = 1L;
		private SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
		private long maxOutOfOrderness = 4000L; // 3.5 seconds

		private long currentMaxTimestamp = 0L;

		@Override
		public long extractTimestamp(Tuple3<String, Long, String> element, long previousElementTimestamp) {
			long current = 0;
			try {
				current = sdf.parse(element.f2).getTime();
			} catch (ParseException e) {
				e.printStackTrace();
			}
			currentMaxTimestamp = Math.max(currentMaxTimestamp, current);
			System.out.println("extractTimestamp:" + element.f2 + "," + current);
			return current;
		}

		@Override
		public Watermark getCurrentWatermark() {
			Watermark waterMark = new Watermark(currentMaxTimestamp - maxOutOfOrderness);
			System.out.println("waterMark:" + waterMark);
			return waterMark;
		}

	}

	// public static class MyBoundedOutofOrdernessTimestampExtractor implements
	// BoundedOutofOrdernessTimestampExtractor {
	//
	// }
	// public static class MyAscendingTimestampExtractor implements
	// AscendingTimestampExtractor {
	//
	// }
}
